import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

public class InsertInto
{
    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Order> ds1 = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4)
        ));

//        DataStream<Order> ds2 = env.fromCollection(Arrays.asList(
//                new Order(1L, "apple", 3),
//                new Order(3L, "car", 2),
//                new Order(1L, "peer", 4)
//        ));

//----------------------------------------------------------------------------------------------------------

//        Table tableA = tEnv.fromDataStream(ds2,$("user"),$("product"),$("amount"));
//        tEnv.registerTable("OutOrders", tableA);

        Table orders  = tEnv.fromDataStream(ds1, $("user"),$("product"),$("amount"));
//        tEnv.createTemporaryView("OutOrders", ds2, $("user"), $("product"), $("amount"));


        TableResult result=tEnv.executeSql("CREATE TABLE OutOrders (`user` BIGINT, product VARCHAR, amount BIGINT) WITH (" +
                "'connector.url' = 'jdbc:mysql://localhost:3306/flink',\n" +
                "'connector.type' = 'jdbc',\n" +
                "'connector.table' = 'table_api',\n" +
                "'connector.username' = 'appleyuchi',\n"+
                "'connector.password' = 'appleyuchi',\n" +
                "'connector.write.flush.max-rows' = '1')");

        orders.executeInsert("OutOrders");
        tEnv.toAppendStream(orders, Row.class).print();
        env.execute();
    }


}
